BigQuery Pipeline

Google Cloud Datalab, with the pipeline subcommand, enables productionizing (i.e. scheduling and orchestrating) notebooks that accomplish ETL with BigQuery and GCS. It uses Apache Airflow (https://airflow.apache.org/start.html) as the underlying technology for orchestrating and scheduling.

Disclaimer: This is still in the experimental stage.

Setup

Google Cloud Composer

Set up a Google Cloud Composer environment using the instructions here: https://cloud.google.com/composer/docs/quickstart, and specify 'datalab' as a python dependency using the instructions here: https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies. The examples in the cells below assume that a Composer environment is available.

Airflow

Alternately, you could also set up your own VM with Airflow as a long-running process. Run the "Airflow Setup" notebook (under samples/contrib/pipeline/); it will setup a GCE VM with the Airflow Scheduler and the dashboard webserver.

The pipeline subcommand in the cells below (and for the pipelines to be deployed successfully) needs either the Composer setup or the Airflow setup.


In [1]:
import datetime
import google.datalab.bigquery as bq
import google.datalab.contrib.bigquery.commands
import google.datalab.contrib.pipeline.airflow
import google.datalab.contrib.pipeline.composer
import google.datalab.kernel
import google.datalab.storage as storage
from google.datalab import Context

project = Context.default().project_id

# Composer variables (change this as per your preference)
environment = 'rajivpb-composer-next'
location = 'us-central1'

# Airflow setup variables
vm_name = 'datalab-airflow'
gcs_dag_bucket_name = project + '-' + vm_name
gcs_dag_file_path = 'dags'

# Setup GCS bucket and BQ datastes
bucket_name = project + '-bq_pipeline'
bucket = storage.Bucket(bucket_name)
bucket.create()
print(bucket.exists())
dataset_name = 'bq_pipeline'
dataset = bq.Dataset(dataset_name)
dataset.create()
print(dataset.exists())

# Start and end timestamps for our pipelines. 
start = datetime.datetime.now()
formatted_start = start.strftime('%Y%m%dT%H%M%S')
end = start + datetime.timedelta(minutes=5)


True
True

In [2]:
%bq pipeline -h


usage: %bq pipeline [-h] -n NAME [-d GCS_DAG_BUCKET] [-f GCS_DAG_FILE_PATH]
                    [-e ENVIRONMENT] [-l LOCATION] [-g DEBUG]

Creates a GCS/BigQuery ETL pipeline. The cell-body is specified as follows:
  input:
    table | path: <BQ table name or GCS path; both if path->table load is also required>
    schema: <For syntax, refer '%%bq execute'>
    format: {csv (default) | json}
    csv: <This section is relevant only when 'format' is 'csv'>
      delimiter: <The field delimiter to use; default is ','>
      skip: <Number of rows at the top of a CSV file to skip; default is 0>
      strict: <{True | False (default)}; whether to accept rows with missing trailing (or optional) columns>
      quote: <Value used to quote data sections; default is '"'>
    mode: <{append (default) | overwrite}; applicable if path->table load>
  transformation: <optional; when absent, a direct conversion is done from input (path|table) to output (table|path)>
    query: <name of BQ query defined via "%%bq query --name ...">
  output:
    table | path: <BQ table name or GCS path; both if table->path extract is required>
    mode: <{append | overwrite | create (default)}; applicable only when table is specified.
    format: <{csv (default) | json}>
    csv: <This section is relevant only when 'format' is 'csv'>
      delimiter: <the field delimiter to use. Defaults to ','>
      header: <{True (default) | False}; Whether to include an initial header line>
      compress: <{True | False (default) }; Whether to compress the data on export>
  schedule:
    start: <formatted as '%%Y-%%m-%%dT%%H:%%M:%%S'; default is 'now'>
    end:  <formatted as '%%Y-%%m-%%dT%%H:%%M:%%S'; default is 'forever'>
    interval: <{@once (default) | @hourly | @daily | @weekly | @ monthly | @yearly | <cron ex>}>
    catchup: <{True | False (default)}; when True, backfill is performed for start and end times.
    retries: Number of attempts to run the pipeline; default is 0
    retry_delay_seconds: Number of seconds to wait before retrying the task
  emails: <comma separated list of emails to notify in case of retries, failures, etc.>
  parameters: <For syntax, refer '%%bq execute'>

optional arguments:
  -h, --help            show this help message and exit
  -n NAME, --name NAME  BigQuery pipeline name
  -d GCS_DAG_BUCKET, --gcs_dag_bucket GCS_DAG_BUCKET
                        The Google Cloud Storage bucket for the Airflow dags.
  -f GCS_DAG_FILE_PATH, --gcs_dag_file_path GCS_DAG_FILE_PATH
                        The file path suffix for the Airflow dags.
  -e ENVIRONMENT, --environment ENVIRONMENT
                        The name of the Google Cloud Composer environment.
  -l LOCATION, --location LOCATION
                        The location of the Google Cloud Composer environment. Refer https://cloud.google.com/about/locations/ for further details.
  -g DEBUG, --debug DEBUG
                        Debug output with the airflow spec.

Building a data transformation pipeline

The pipeline subcommand deploys and orchestrates an ETL pipeline. It supports specifying either an existing BQ table or a GCS path (with accompanying schema) as the data input, executing a transformation with BQ SQL and producing an output of the results (again, either a BQ table or a GCS path). This pipeline can be executed on a schedule. Additionally, parameters can be specified to templatize or customize the pipeline.


In [3]:
github_archive = 'githubarchive.month.201802'

In [4]:
%%bq query --name my_pull_request_events
SELECT id, created_at, repo.name FROM input
WHERE actor.login = 'rajivpb' AND type = 'PullRequestEvent'

In [5]:
# We designate the following 'output' for our pipeline. 
results_table = project + '.' + dataset_name + '.' + 'pr_events_' + formatted_start

# Pipeline name is made unique by suffixing a timestamp
pipeline_name = 'github_once_' + formatted_start

In [6]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
input:
  table: $github_archive
transformation:
  query: my_pull_request_events
output:
  table: $results_table
  mode: overwrite
schedule:
  start: $start
  end: $end
  interval: '@once'
  catchup: True


Out[6]:
u'Composer pipeline successfully deployed! View dashboard for more details.\n'

When the above cell is run, a pipeline is deployed and the results of the query are written into the BQ results table (i.e. $results_table). It could take 5-10 min between when the cell is executed for the result_table to show up. Below, we'll see additional examples for alternate ways of specifying the source, the source-types supported, and for customizing the pipeline.

Parameterization

The parameters section provides the ability to customize the inputs and outputs of the pipeline. These parameters are merged with the SQL query parameters into a list, and are specified in the cell body (along the same lines as the %bq execute command, for example).

In addition to parameters that the users can define, the following mapping keys have been made available for formatting strings and are designed to capture common scenarios around parameterizing the pipeline with the execution timestamp.

  • '_ds': the date formatted as YYYY-MM-DD
  • '_ts': the full ISO-formatted timestamp YYYY-MM-DDTHH:MM:SS.mmmmmm
  • '_ds_nodash': the date formatted as YYYYMMDD (i.e. YYYY-MM-DD with 'no dashes')
  • '_ts_nodash': the timestamp formatted as YYYYMMDDTHHMMSSmmmmmm (i.e full ISO-formatted timestamp without dashes or colons)
  • '_ts_year': 4-digit year
  • '_ts_month': '1'-'12'
  • '_ts_day': '1'-'31'
  • '_ts_hour': '0'-'23'
  • '_ts_minute': '0'-'59'
  • '_ts_second': '0'-'59'

In [7]:
# The source/input is formatted with the built-in mapping keys _ts_year and 
# _ts_month and these are evaluated (or "bound") at the time of pipeline 
# execution. This could be at some point in the future, or at some point in the 
# "past" in cases where a backfill job is being executed.
github_archive_current_month = 'githubarchive.month.%(_ts_year)s%(_ts_month)s'

# The destination/output is formatted with additional user-defined parameters
# 'project', 'dataset', and 'user'. These are evaluated/bound at the time of 
# execution of the %bq pipeline cell. 
results_table = '%(project)s.%(dataset_name)s.%(user)s_pr_events_%(_ts_nodash)s'

pipeline_name = 'github_parameterized_' + formatted_start

In [8]:
%%bq query --name my_pull_request_events
SELECT id, created_at, repo.name FROM input
WHERE actor.login = @user AND type = 'PullRequestEvent'

In [9]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
input:
  table: $github_archive_current_month
transformation:
  query: my_pull_request_events
output:
  table: $results_table
  mode: overwrite
parameters:
  - name: user
    type: STRING
    value: 'rajivpb'
  - name: project
    type: STRING
    value: $project
  - name: dataset_name
    type: STRING
    value: $dataset_name
schedule:
  start: $start
  end: $end
  interval: '@once'
  catchup: True


Out[9]:
u'Composer pipeline successfully deployed! View dashboard for more details.\n'

SQL-based data transformation pipeline for GCS data

pipeline also supports specifying GCS paths as both the input (accompanied by a schema) and output, thus completely bypassing the specification of any BQ tables. Garbage collection of all intermediate BQ tables is handled for the user.


In [10]:
gcs_input_path = 'gs://cloud-datalab-samples/cars.csv'
gcs_output_path = 'gs://%(bucket_name)s/all_makes_%(_ts_nodash)s.csv'
pipeline_name = 'gcs_to_gcs_transform_' + formatted_start

In [11]:
%%bq query --name all_makes
SELECT Make FROM input

In [12]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
input:
  path: $gcs_input_path
  schema:
    - name: Year
      type: INTEGER
    - name: Make
      type: STRING
    - name: Model
      type: STRING
    - name: Description
      type: STRING
    - name: Price
      type: FLOAT
  csv:
    skip: 1
transformation: 
  query: all_makes
output:
  path: $gcs_output_path
parameters:
  - name: bucket_name
    type: STRING
    value: $bucket_name
schedule:
  start: $start
  end:  $end
  interval: '@once'
  catchup: True


Out[12]:
u'Composer pipeline successfully deployed! View dashboard for more details.\n'

Load data from GCS into BigQuery

pipeline can also be used to parameterize and schedule the loading of data from GCS to BQ, i.e the equivalent of the %bq load command.


In [13]:
bq_load_results_table = '%(project)s.%(dataset_name)s.cars_load'
pipeline_name = 'load_gcs_to_bq_' + formatted_start

In [14]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
load:
  path: $gcs_input_path
  schema:
    - name: Year
      type: INTEGER
    - name: Make
      type: STRING
    - name: Model
      type: STRING
    - name: Description
      type: STRING
    - name: Price
      type: FLOAT
  csv:
    skip: 1
  table: $bq_load_results_table
  mode: overwrite
parameters:
  - name: project
    type: STRING
    value: $project
  - name: dataset_name
    type: STRING
    value: $dataset_name
schedule:
  start: $start
  end: $end
  interval: '@once'
  catchup: True


Out[14]:
u'Composer pipeline successfully deployed! View dashboard for more details.\n'

Extract data from BigQuery into GCS

Similar to load, pipeline can also be used to perform the equivalent of the %bq extract command. To illustrate, we extract the data in the table that was the result of the 'load' pipeline, and write it to a GCS file.

Now, it's possible that if you "Ran All Cells" in this notebook, this pipeline gets deployed at the same time as the previous load-pipeline, in which case the source table isn't yet ready. Hence we set retries to 3, with a delay of 90 seconds and hope that the table eventually does get created and this pipeline is successful.


In [15]:
gcs_extract_path = 'gs://%(bucket_name)s/cars_extract_%(_ts_nodash)s.csv'
pipeline_name = 'extract_bq_to_gcs_' + formatted_start

In [16]:
%%bq pipeline --name $pipeline_name -e $environment -l $location
extract:
  table: $bq_load_results_table
  path: $gcs_extract_path
  format: csv
  csv:
    delimiter: '#'
parameters:
  - name: bucket_name
    type: STRING
    value: $bucket_name
  - name: project
    type: STRING
    value: $project
  - name: dataset_name
    type: STRING
    value: $dataset_name
schedule:
  start: $start
  interval: '@once'
  catchup: True
  retries: 3
  retry_delay_seconds: 90


Out[16]:
u'Composer pipeline successfully deployed! View dashboard for more details.\n'

Output of successful pipeline runs


In [17]:
# You will see two files named all_makes_<timestamp> and cars_extract_<timestamp>
# under the bucket:
!gsutil ls gs://$bucket_name
  
# You will see three tables named cars_load, pr_events_<timestamp> and 
# <user>_pr_events_<timestamp> under the BigQuery dataset:
!bq ls $dataset_name

Cleanup


In [ ]:
# Delete the contents of the GCS bucket, the GCS bucket itself, and the BQ 
# dataset. Uncomment the lines below and execute.
#!gsutil rm -r gs://$bucket_name
#!bq rm -r -f $dataset_name

Stop Billing


In [ ]:
# If you chose the Airflow VM (in the Setup), this will delete the VM. Uncomment the 
# line below and execute.
#!gcloud compute instances stop $vm_name --zone us-central1-b --quiet

In [ ]:
# This just verifies that cleanup actually worked. Run this after running the 
# 'Cleanup' cell

#Should show two error messages like "BucketNotFoundException: 404 gs://..."
!gsutil ls gs://$bucket_name
!gsutil ls gs://$gcs_dag_bucket_name/dags
  
#Should show an error message like "BigQuery error in ls operation: Not found ..."
!bq ls $dataset_name